BigQuery テーブル作成イベントで、description の日本語をカラム名にしたビューをバッチ処理で作成してみた。

BigQuery テーブル作成イベントで、description の日本語をカラム名にしたビューをバッチ処理で作成してみた。

Clock Icon2024.09.21

こんにちは、みかみです。

好きな童謡はやぎさんゆうびんです。
自分で食べちゃったのに「しーかたがないので」お手紙書いちゃう行動力!エンドレスなほのぼの感がなんとも。

やりたいこと

  • BigQuery データを利用する時に、日本語名でアクセスしたい
  • BigQuery テーブルカラム名は日本語にしたくない
  • BigQuery にテーブルが作成されたら、description の日本語をカラム名にしたビューを自動で作成したい

データ利用者からすると、データ参照では日本語の項目の方が使いやすいケースも多いのではないかと思います。
BigQuery では日本語カラム名でテーブルやビューを作成できます。

とはいえ、カラム名が日本語の場合、SQLのカラム名をバッククォートでくくる必要があったり、バッチ処理および運用保守で扱うには少し手間がかかります。
バッチ処理で description に日本語を持つテーブルを作成した後に、description をカラム名としたビューを作成したら良いのでは?
とはいえ、毎回手動でビューを作成するのは手間がかかって大変です。
では、テーブル作成イベントをトリガにバッチ処理でビュー作成できたら便利では?

本ブログでは、Dataform から BigQuery テーブルを作成するケースを想定しています。

図にするとこんな感じ。

create_view_overview

なお、カラム名の日本語サポートは、2024/09/20現在まだプレビューの機能なので、ご利用の際にはご留意ください。

前提

Google Cloud SDK(gcloud コマンド)の実行環境は準備済みであるものとします。 本エントリでは、Cloud Shell を使用しました。

また、BigQuery や Dataform など各サービス操作に必要な API の有効化と必要な権限は付与済みです。

Cloud Run Functions 実行用のサービスアカウントは前回作成したものを流用するので、サービスアカウント作成手順は省略します。
※本ブログ処理内容では roles/dataplex.admin は不要です。

なお、文中、プロジェクトIDなど一部の文字は伏字に変更しています。

Dataform 実装

以下の Dataform コードを実装済みです。

code_dataform

work.table_test を参照して、日本語 description 付きの work.mart_test を作成します。

Cloud Run 関数をデプロイ

Python コードを準備

以下のPythonコードを、main.py というファイル名で保存しました。

from google.cloud import bigquery
import functions_framework
import os
import time

# INFORMATION_SCHEMAからカラムとdescriptionを取得するためのテンプレート
query_template = """
SELECT
  c.column_name,
  c.data_type,
  c.is_nullable,
  cf.description,
  c.ordinal_position,
  cf.field_path
FROM
  `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS` AS c
LEFT JOIN
  `{project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS` AS cf
ON
  c.table_catalog = cf.table_catalog
  AND c.table_schema = cf.table_schema
  AND c.table_name = cf.table_name
  AND c.column_name = cf.column_name
WHERE
  c.table_name = '{table_id}'
ORDER BY
  c.ordinal_position
"""

# ビューのSQLを作成するためのテンプレート
view_sql_template = """
CREATE OR REPLACE VIEW `{project_id}.{dataset_id}.{view_id}` AS
SELECT
  {columns_sql}
FROM
  `{project_id}.{dataset_id}.{table_id}`
"""

def create_view(project_id, dataset_id, table_id):
    client = bigquery.Client()

    # カラム情報取得SQL作成
    query = query_template.format(project_id=project_id, dataset_id=dataset_id, table_id=table_id)
    # クエリを実行
    query_job = client.query(query)
    results = query_job.result()

    # カラム名とdescriptionを取得
    columns = []
    has_nested_structure = False
    missing_description = False
    for row in results:
        column_name = row['column_name']
        data_type = row['data_type']
        description = row['description']

        # ネスト構造を持つデータ型がある場合は終了
        if 'STRUCT' in data_type or 'RECORD' in data_type or 'ARRAY' in data_type or 'JSON' in data_type:
            has_nested_structure = True
            break

        # descriptionがないカラムがある場合は終了
        if not description:
            missing_description = True
            break

        columns.append(f"`{column_name}` AS `{description}`")

    # ネスト構造がある場合やdescriptionがないカラムがある場合は終了
    if has_nested_structure:
        print("[functions]The table contains nested structures. View creation aborted.")
        return
    elif missing_description:
        print("[functions]One or more columns are missing descriptions. View creation aborted.")
        return
    else:
        # CREATE VIEW SQL作成
        view_id = f"v_{table_id}"
        columns_sql = ",\n  ".join(columns)
        view_sql = view_sql_template.format(project_id=project_id, dataset_id=dataset_id, table_id=table_id, view_id=view_id, columns_sql=columns_sql)

        # ビューを作成
        client.query(view_sql).result()
        print(f"[functions]View `{view_id}` created successfully.")
        return

@functions_framework.cloud_event
def main(cloudevent):
    print(f"Event type: {cloudevent['type']}")
    if 'subject' in cloudevent:
        print(f"Subject: {cloudevent['subject']}")

    # 環境変数チェック
    PROJECT_ID = os.getenv('PROJECT_ID')
    SA_DATAFORM = os.getenv('SA_DATAFORM')
    if not PROJECT_ID or not SA_DATAFORM:
        raise Exception('Invalid env val.')

    # イベントデータチェック
    payload = cloudevent.data.get("protoPayload")
    if not payload:
        return
    principalEmail = payload.get('authenticationInfo', {}).get('principalEmail', None)
    if principalEmail != SA_DATAFORM:
        # Dataformからのクエリじゃない場合は処理せず終了
        return
    jobChange = payload.get('metadata', {}).get('jobChange', None)
    if not jobChange:
        return
    statementType = jobChange.get('job', {}).get('jobConfig', {}).get('queryConfig', {}).get('statementType', None)
    if statementType != 'CREATE_TABLE_AS_SELECT':
        return
    statementType = jobChange.get('job', {}).get('jobConfig', {}).get('queryConfig', {}).get('statementType', None)
    destinationTable = jobChange.get('job', {}).get('jobConfig', {}).get('queryConfig', {}).get('destinationTable', None)
    dst_dataset = destinationTable.split('/')[3]
    dst_table = destinationTable.split('/')[5]
    print(f"[functions] dst_dataset: {dst_dataset} / dst_table: {dst_table}")

    # TODO: すぐに実行するとdescription情報が取れないことがあったので、10秒待機
    time.sleep(10)

    create_view(PROJECT_ID, dst_dataset, dst_table)

処理実行に必要な情報を環境変数から取得し、トリガイベントの監査ログ内容をチェックします。

テーブル作成イベントだった場合は、INFORMATION_SCHEMA からテーブルカラムと description 情報を取得し、ビュー作成クエリを実行します。
RECORDJSON 型など、ネスト構造のデータ型をテーブルには対応していません(ビュー作成せず終了します)。
また、description がないカラムがあった場合も、ビュー作成はせず終了します。

また、以下を requirements.txt というファイル名で保存しました。

functions-framework==3.*
google-cloud-bigquery>=3.25.0

デプロイ

先ほどのファイルを保存したのと同じディレクトリで、以下のコマンドを実行して Cloud Run Functions をデプロイしました。

gcloud functions deploy create_view \
    --gen2 \
    --region asia-northeast1 \
    --runtime python312 \
    --entry-point=main \
    --trigger-event-filters="type=google.cloud.audit.log.v1.written" \
    --trigger-event-filters="serviceName=bigquery.googleapis.com" \
    --trigger-event-filters="methodName=google.cloud.bigquery.v2.JobService.InsertJob" \
    --trigger-location=asia-northeast1 \
    --service-account=sa-functions@[PROJECT_ID].iam.gserviceaccount.com \
    --trigger-service-account=sa-functions@[PROJECT_ID].iam.gserviceaccount.com \
    --set-env-vars PROJECT_ID=[PROJECT_ID],SA_DATAFORM=service-[PROJECT_NO]@gcp-sa-dataform.iam.gserviceaccount.com

実行

Dataform を実行して、テーブル作成後にビューが作成されるか確認します。

exec_dataform_mart_test

Dataform 実行後、Cloud Run Functions のログを確認してみると

log_create_view

ビューが作成できたようです。

BigQuery コンソールからも確認してみます。

以下、Dataform から作成した、mart_test テーブルのスキーマと格納データです。

mart_test_schema

mart_test_preview

Cloud Run Functions から作成されたビューも確認してみます。

v_mart_test_schema

v_mart_test_select

想定通り、テーブルデータを日本語カラムのビューから参照することができました。

まとめ(所感)

前回に引き続き、Dataform からのテーブル作成(再作成)ケースを想定していますが、Functions コードの Dataform サービスアカウントチェック部分を変更すれば、SQL 実行元にかかわらず Functions を起動することができます。

また、前回&今回はテーブル作成(再作成)イベントをトリガに処理実行していますが、こちらも、Functions コードのイベントチェック部分を変更すれば、データ INSERT や UPDATE イベントなどをトリガに任意の処理を実行することが可能です。

BigQuery の更新イベントをトリガに、Cloud Run Functions を簡単に実行することができます。
Cloud Run Functions で他のサービスと連携するコードを実装すれば、データの自動処理やリアルタイム分析など、さまざまな処理が可能になります。
必要に応じて、ぜひご活用ください。

参考

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.